package com.atguigu.app.dwd;

import com.atguigu.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//数据流：Mock -> Mysql -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序：Mock -> Mysql -> Maxwell -> Kafka(ZK) -> DwdTradeCartAdd -> Kafka(ZK)
public class DwdTradeCartAdd {

    //1  定义来源 topic_db (多层级json)
    //2  定义来源 lookup表   base_dic ( mysql  ,hbase)
    //3  sql 从topic_db中提取加购事实数据
    //4  用lookupjoin 关联base_dic
    //5  定义目标表 dwd_trade_cart_add
    //6 写入目标表  insert into select xxxx from xxx
    public static void main(String[] args) {
        //0环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //1  定义来源 topic_db (多层级json)
        //CREATE TABLE topic_db (
        //  `database`    STRING,
        //  `table` STRING,
        //  `type` STRING,
        //  `ts` STRING,
        //  `data` MAP<STRING,STRING>,
        //  `old` MAP<STRING,STRING>,
        //  `proc_time` as proctime()
        //) WITH (
        //  'connector' = 'kafka',
        //  'topic' = 'topic_db',
        //  'properties.bootstrap.servers' = 'hadoop102:9092',
        //  'properties.group.id' = 'dwd_trade_cart_add',
        //  'scan.startup.mode' = 'group-offsets',
        //  'format' = 'json'
        //);

        String createTopicDBSQL = "  CREATE TABLE topic_db (\n" +
                "         `database`    STRING,\n" +
                "          `table` STRING,\n" +
                "         `type` STRING,\n" +
                "           `ts` STRING,\n" +
                "           `data` MAP<STRING,STRING>,\n" +
                "           `old` MAP<STRING,STRING>,\n" +
                "          `proc_time` as proctime()\n" +
                "         ) WITH (\n" +
                "            'connector' = 'kafka',\n" +
                "          'topic' = 'topic_db',\n" +
                "           'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "           'properties.group.id' = 'dwd_trade_cart_add',\n" +
                "           'scan.startup.mode' = 'group-offsets',\n" +
                "           'format' = 'json'\n" +
                "         )";
        tableEnv.executeSql(createTopicDBSQL);


        //2  定义来源 lookup表   base_dic ( mysql  ,hbase)
        //   CREATE TABLE base_dic (
        //        dic_code STRING,
        //        dic_name STRING,
        //        PRIMARY KEY (dic_code) NOT ENFORCED
        //  ) WITH (
        //     'connector' = 'jdbc' ,
        //     'url' = 'jdbc:phoenix:hadoop102:2181',
        //     'table-name' = 'GMALL2022.DIM_BASE_DIC'
        //   ) ;
        String createBaseDicSQL = "  CREATE TABLE base_dic (\n" +
                "                dic_code STRING,\n" +
                "               dic_name STRING,\n" +
                "               PRIMARY KEY (dic_code) NOT ENFORCED\n" +
                "         ) WITH (\n" +
                "             'connector' = 'jdbc' ,\n" +
                "             'url' = 'jdbc:mysql://hadoop102:3306/gmall-220718-flink?user=root&password=000000&useUnicode=true&characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false',\n" +
                "            'table-name' = 'base_dic',\n" +
                "          'lookup.cache.max-rows' = '1000',\n" +
                "          'lookup.cache.ttl' = '60s'\n" +
                "         )  ";

        tableEnv.executeSql(createBaseDicSQL);
        /// tableEnv.executeSql("select * from base_dic").print();


        //3  sql 从topic_db中提取加购事实数据
        //  select
        //    `data`['id'] ,
        //    `data`['user_id'] ,
        //    `data`['sku_id'] ,
        //    `data`['source_id'] ,
        //    `data`['source_type'] ,
        //    `ts`  ,
        // cast( `data`['sku_num'] as  INTEGER) -  cast( `old`['sku_num']  as INTEGER ) as sku_num
        // from topic_db   where   `table`='cart_info'
        //  and  ( type='insert'  or
        //    ( type='update' and   cast( `old`['sku_num'] as  INTEGER) -  cast( `data`['sku_num']  as INTEGER )<0)
        //  )
        String selectCartAddSql = "       select\n" +
                "            `data`['id']  as id ,\n" +
                "            `data`['user_id'] as user_id,\n" +
                "            `data`['sku_id'] as sku_id,\n" +
                "            `data`['source_id'] as source_id,\n" +
                "           `data`['source_type']  as source_type,\n" +
                "            `ts`  ,\n" +
                "       if(type='insert', `data`['sku_num'],  cast( (cast( `data`['sku_num'] as  INTEGER) -  cast( `old`['sku_num']  as INTEGER )) as STRING) )as sku_num ,\n" +
                "       `proc_time`  " +
                "from topic_db   where   `table`='cart_info'\n" +
                "           and  ( type='insert'  or\n" +
                "            ( type='update' and   cast( `old`['sku_num'] as  INTEGER) -  cast( `data`['sku_num']  as INTEGER )<0)\n" +
                "          )";

        Table table = tableEnv.sqlQuery(selectCartAddSql);
        //  tableEnv.executeSql("select * from "+table).print();
        tableEnv.createTemporaryView("v_cart_add", table);


        //4  用lookupjoin 关联base_dic
        // select  vc.* ,vc.dic_name as source_type_name   from v_cart_add vc
        // join base_dic FOR SYSTEM_TIME AS OF vc.proc_time AS bd
        // on vc.source_type=vc.dic_code

        String lookupJoinSQL = "select  vc.id,vc.user_id,vc.sku_id,vc.source_id,vc.source_type, bd.dic_name, vc.sku_num,ts   from v_cart_add vc\n" +
                "         join base_dic FOR SYSTEM_TIME AS OF vc.proc_time AS bd\n" +
                "       on vc.source_type=bd.dic_code";

        // tableEnv.executeSql(lookupJoinSQL).print();

        //5  定义目标表 dwd_trade_cart_add
        String createTradeCartAddSQL = "       " +
                " CREATE TABLE dwd_trade_cart_add (\n" +
                "           id STRING,\n" +
                "           user_id STRING,\n" +
                "           sku_id STRING,\n" +
                "           source_id STRING,\n" +
                "           source_type_code STRING,\n" +
                "           source_type_name STRING,\n" +
                "           sku_num STRING,\n" +
                "           ts STRING\n" +
                "         ) " + KafkaUtil.getKafkaSinkDDL("dwd_trade_cart_add");
        tableEnv.executeSql(createTradeCartAddSQL);

        //6 写入目标表  insert into select xxxx from xxx
        String insertCartAddSQL = "insert into dwd_trade_cart_add " + lookupJoinSQL;

        tableEnv.executeSql(insertCartAddSQL);

    }
}
